{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Flume是非常流行的日志采集系统，可以作为DStream的高级数据源。本部分将介绍如何让Flume推送消息给Spark Streaming，Spark Streaming收到消息后进行处理。"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "把Flume Source设置为**netcat**类型，从终端上不断给Flume Source发送各种消息，Flume把消息汇集到Sink，这里把Sink类型设置为avro，由Sink把消息推送给Spark Streaming，由我们编写的Spark Streaming应用程序对消息进行处理。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from __future__ import print_function\n",
    " \n",
    "import sys\n",
    " \n",
    "from pyspark import SparkContext\n",
    "from pyspark.streaming import StreamingContext\n",
    "from pyspark.streaming.flume import FlumeUtils\n",
    "import pyspark\n",
    "if __name__ == \"__main__\":\n",
    "    if len(sys.argv) != 3:\n",
    "        print(\"Usage: flume_wordcount.py <hostname> <port>\", file=sys.stderr)\n",
    "        exit(-1)\n",
    " \n",
    "    sc = SparkContext(appName=\"FlumeEventCount\")\n",
    "    ssc = StreamingContext(sc, 2)\n",
    " \n",
    "    hostname= sys.argv[1]\n",
    "    port = int(sys.argv[2])\n",
    "    stream = FlumeUtils.createStream(ssc, hostname, port,pyspark.StorageLevel.MEMORY_AND_DISK_SER_2)\n",
    "    stream.count().map(lambda cnt : \"Recieve \" + str(cnt) +\" Flume events!!!!\").pprint()\n",
    " \n",
    "    ssc.start()\n",
    "    ssc.awaitTermination()"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
